package com.atguigu.gmall1118.app

import com.atguigu.gmall1118.bean.TagInfo
import com.atguigu.gmall1118.dao.TagInfoDAO
import com.atguigu.gmall1118.util.MyPropertiesUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.util.Properties

object TaskMergeApp {


  //1、如何得知有哪些标签单表？
  //     1 可以通过获取启用的标签  2 可以查询showtables获得表名
  //  查询mysql获得启用的标签对象 TagInfoList
  //2   创建一张宽表
  //    1 手动创建 2自动生成
  //    1 每天创建一张宽表  2 创建一张，每天增加分区 类似数仓
  //比如前天有50个标签  今天有新增了30个
  //     user_tag_merge_20220621    user_tag_merge_20220622
  //
  //3  拼接sql
  //    把多张表union
  //   利用pivot进行行转列
  //   拼上insert
  //
  //通过执行一条pivot语句把多个标签单表 插入一张标签宽表
  //insert select
  def main(args: Array[String]): Unit = {
       //
        val taskId=args(0)
        val busiDate=args(1)

    val sparkConf: SparkConf = new SparkConf().setAppName("task_merge_app").setMaster("local[*]")
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()


      //1 查询mysql获得启用的标签对象 TagInfoList
      val tagInfoList: List[TagInfo] = TagInfoDAO.getTagInfoListWithOn()
      //2 建表
      //  根据启用的标签列表 来决定宽表的字段
       val tableName =s"user_tag_merge_${busiDate.replace("-","")}"

       val fieldList: List[String] = tagInfoList.map(tagInfo => s"${tagInfo.tagCode.toLowerCase} string")
       val fieldsSql: String = fieldList.mkString(",")

    val properties: Properties = MyPropertiesUtil.load("config.properties")
    val hdfsPath: String = properties.getProperty("hdfs-store.path")
    val gmallDbName: String = properties.getProperty("data-warehouse.dbname")
    val upDbName: String = properties.getProperty("user-profile.dbname")

       val createTableSQL=
      s"""
         | create table if not exists $upDbName.$tableName
         | (uid string,$fieldsSql )
         |  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t'
         |  location '$hdfsPath/$upDbName/$tableName'
         |""".stripMargin

    val dropSQL=s"drop table if exists  $upDbName.$tableName"
    println(dropSQL)
    sparkSession.sql(dropSQL)
    println(createTableSQL)
    sparkSession.sql(createTableSQL)

//    3  拼接sql
//      把多张表union
//    利用pivot进行行转列
//    拼上insert

    // select  from tag_a
    // union all
    // select from tag_b
    //union all
    //....
    val tagSQLList: List[String] = tagInfoList.map(tagInfo => s" select uid,'${tagInfo.tagCode.toLowerCase}' tag_code,tag_value from ${tagInfo.tagCode.toLowerCase} where dt='${busiDate}' ")

    val unionSQL: String = tagSQLList.mkString(" union all ")

    val tagCodeList: List[String] = tagInfoList.map(tagInfo => "'"+tagInfo.tagCode.toLowerCase+"'")
     val tagCodeInSQL:String = tagCodeList.mkString(",")

    // 维度列   uid
    // 旋转列   tag_code
    // 聚合列   max(tag_value)
    //  select * from ( unionsql) pivot ( max(tag_value) tag_value  for  tag_code in ('','','') )

    val pivotSQL=
      s"""
         |select * from ( $unionSQL) pivot ( max(tag_value) tag_value  for  tag_code in ($tagCodeInSQL ) )
         |
         |""".stripMargin
    println(pivotSQL)


    val insertSQL=s" insert overwrite table $tableName  $pivotSQL"
    println(insertSQL)
    sparkSession.sql(s" use $upDbName")
    sparkSession.sql(insertSQL)



  }




}
